/*###################################################################
  > File Name: storage/controller/volume_proto_readcache.c
  > Author: Vurtune
  > Mail: vurtune@foxmail.com
  > Created Time: Thu 24 Aug 2017 11:51:27 PM PDT
###################################################################*/
#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "dbg.h"
#include "adt.h"
#include "nodectl.h"
#include "sysy_lib.h"
#include "ypage.h"
#include "sysutil.h"
#include "configure.h"
#include "volume_proto.h"
#include "metadata.h"
#include "core.h"
#include "md_proto.h"
#include "md_map.h"
#include "net_global.h"
#include "volume_ctl.h"
#include "volume.h"
#include "ramdisk.h"
#include "ylog.h"
#include "redis_utils.h"
#include "volume_proto_readcache.h"


#define MAX_READCACHE_KEY_LEN MAX_NAME_LEN

#define READCACHE_PAGE_SIZE (1<<12)  /* 4096 */
#define READCACHE_KEY_FORMAT "%ju.%ju.%ju"  /* idx.version.offset (4k)*/

#define READCACHE_INFO_INTERVAL (1)
#define READCACHE_INFO "readcache/info"

#define __READCACHE_HASH__ (256)


typedef enum {
        RDCACHE_OP_GET = 0,
        RDCACHE_OP_SET,
        RDCACHE_OP_MGET,
        RDCACHE_OP_MSET,

        RDCACHE_OP_MAX,
} type_t;

typedef struct {
        int idx;
        sem_t sem;
        sy_spinlock_t lock;
        redis_ctx_t *ctx;
        struct list_head list;
} readcache_ctx_t;

typedef struct {
        struct list_head hook;

        type_t op;
        union {
                struct {
                        char *key;
                        void *buf;
                } lba;
                mctx_t mctx;
        } arg;

        int ret;
        task_t task;

} entry_t;

static readcache_ctx_t *__readcache_ctx__[__READCACHE_HASH__];

static void * __readcache_redis_worker(void *arg);
static int __readcache_redis_worker__(readcache_ctx_t *ctx);

static inline int readcache_trans(const chkid_t *chkid, uint64_t info_version, uint64_t lba, char *key);

#if 0
static int readcache_get(const chkid_t *chkid, uint64_t info_version, uint64_t lba, void *buf);
static int readcache_set(const chkid_t *chkid, uint64_t info_version, uint64_t lba, void *buf);
#endif

static int readcache_mset_init(entry_t **_ent);
static int readcache_mget_init(entry_t **_ent);

static int readcache_mset_append(entry_t *ent, const chkid_t *chkid, uint64_t info_version, uint64_t lba, void *buf);
static int readcache_mget_append(entry_t *ent, const chkid_t *chkid, uint64_t info_version, uint64_t lba);

static int readcache_mset_exec(entry_t *ent);
static int readcache_mget_exec(entry_t *ent);

static int readcache_mset_destory(entry_t **ent);
static int readcache_mget_destory(entry_t **ent);

/*         info            */
typedef struct {
        uint64_t pages;
        uint64_t hit;
} hit_ratio_t;

static hit_ratio_t hit_ratio;
static time_t __last_dump__;
static sy_rwlock_t info_lock;

static void __readcache_update_hit__(uint64_t pages, uint64_t hit);
static void __readcache_info_dump__();

static int __inited__ = 0;

inline int readcache_ready()
{
        return __inited__;
}

void readcache_set_ready()
{
        __inited__ = 1;
}

int readcache_init(const char *path)
{
        int i, ret;
        char *ptr;
        pthread_t th;
        pthread_attr_t ta;
        redis_ctx_t *redis_ctx;
        readcache_ctx_t *cache_ctx;

        ret = sy_rwlock_init(&info_lock, "readcache.info");
        if (unlikely(ret))
                GOTO(err_ret, ret);


        ret = ymalloc((void **)&ptr, sizeof(readcache_ctx_t) * __READCACHE_HASH__);
        if (unlikely(ret))
                GOTO(err_ret, ret);


        for (i = 0; i < __READCACHE_HASH__; i++) {

                ret = connect_redis_unix(path, &redis_ctx);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                cache_ctx = (readcache_ctx_t *)(ptr + i * sizeof(readcache_ctx_t));

                cache_ctx->idx = i;
                cache_ctx->ctx = redis_ctx;

                ret = sem_init(&cache_ctx->sem, 0 , 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = sy_spin_init(&cache_ctx->lock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                INIT_LIST_HEAD(&cache_ctx->list);

                __readcache_ctx__[i] = cache_ctx;

                (void) pthread_attr_init(&ta);
                (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

                ret = pthread_create(&th, &ta, __readcache_redis_worker, __readcache_ctx__[i]);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        }

        ret = flush_redis(__readcache_ctx__[0]->ctx);
        if (unlikely(ret))
                GOTO(err_disconn, ret);

        hit_ratio.pages = 0;
        hit_ratio.hit = 0;

        readcache_set_ready();

        return 0;
err_free:
        yfree((void**)&ptr);
err_disconn:
        //TODO
err_ret:
        return ret;
}

static int __readcache_hash__()
{
        int hash = _random();
        return (hash % __READCACHE_HASH__);
}

static int __readcache_redis_commit(entry_t *ent)
{
        int ret;
        readcache_ctx_t *rdctx = NULL;

        int hash = __readcache_hash__();

        rdctx = __readcache_ctx__[hash];

        YASSERT(rdctx->idx == hash);

        ret = sy_spin_lock(&rdctx->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_add_tail(&ent->hook, &rdctx->list);

        sy_spin_unlock(&rdctx->lock);

        sem_post(&rdctx->sem);

        ret = schedule_yield("readcache redis commit", NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __readcache_redis_worker__(readcache_ctx_t *ctx)
{
        int ret, len;
        entry_t *ent;
        mctx_t *mctx;
        struct list_head list;
        struct list_head *pos, *n;

        redis_ctx_t *redis_ctx = ctx->ctx;

        INIT_LIST_HEAD(&list);

        ret = sy_spin_lock(&ctx->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_splice_init(&ctx->list, &list);

        sy_spin_unlock(&ctx->lock);

        if (!list_empty(&list)) {
                list_for_each_safe(pos, n , &list) {

                        ent = (entry_t *)pos;

                        switch (ent->op)
                        {
                        case RDCACHE_OP_GET:
                                ent->ret = kget(redis_ctx, ent->arg.lba.key, ent->arg.lba.buf, &len);
                                if (!ent->ret)
                                        YASSERT(len == READCACHE_PAGE_SIZE);
                                break;
                        case RDCACHE_OP_SET:
                                ent->ret = kset(redis_ctx, ent->arg.lba.key, ent->arg.lba.buf, READCACHE_PAGE_SIZE);
                                break;
                        case RDCACHE_OP_MSET:
                                mctx = &ent->arg.mctx;
                                DBUG("idx %d mset(%d) %d\n", ctx->idx, mctx->type, mctx->segcount);
                                ent->ret = multi_exec(mctx, redis_ctx);
                                break;
                        case RDCACHE_OP_MGET:
                                mctx = &ent->arg.mctx;
                                DBUG("idx %d mget(%d) %d\n", ctx->idx, mctx->type, mctx->segcount);
                                ent->ret =  multi_exec(mctx, redis_ctx);
                                break;
                        default:
                                DWARN("Should not run here!!!\n");
                                YASSERT(0);
                                break;
                        }

                        list_del(pos);
                        schedule_resume(&ent->task, ent->ret, NULL);
                }
        }

        return 0;
err_ret:
        return ret;
}

static void * __readcache_redis_worker(void *arg)
{
        int ret;

        readcache_ctx_t *rdctx = arg;

        DINFO("readcache redis worker %d start up\n", rdctx->idx);

        while(srv_running) {
                ret = _sem_wait(&rdctx->sem);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __readcache_redis_worker__(rdctx);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}


static inline int readcache_trans(const chkid_t *chkid, uint64_t info_version, uint64_t offset, char *key)
{
        YASSERT(!(offset % READCACHE_PAGE_SIZE));

        snprintf(key, MAX_READCACHE_KEY_LEN, ""READCACHE_KEY_FORMAT"", chkid->id, info_version, offset);

        return 0;
}

#if 0
static int readcache_get(const chkid_t *chkid, uint64_t info_version, uint64_t lba, void *buf)
{
        int ret;
        entry_t *ent;

        YASSERT(lba % READCACHE_PAGE_SIZE == 0);

        ret = ymalloc((void **)&ent, sizeof(entry_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->op = RDCACHE_OP_GET;
        ent->arg.lba.buf = buf;
        ent->task = schedule_task_get();

        ret = ymalloc((void **)&ent->arg.lba.key, MAX_READCACHE_KEY_LEN);
        if (unlikely(ret))
                GOTO(err_free, ret);

        readcache_trans(chkid, info_version, lba, ent->arg.lba.key);

        ret = __readcache_redis_commit(ent);
        if (unlikely(ret))
                GOTO(err_free1, ret);


        yfree((void **)ent->arg.lba.key);
        yfree((void **)&ent);
        return 0;

err_free1:
        yfree((void **)ent->arg.lba.key);
err_free:
        yfree((void **)&ent);
err_ret:
        return ret;
}

/*    mset    */
static int readcache_set(const chkid_t *chkid, uint64_t info_version, uint64_t lba, void *buf)
{
        int ret;
        entry_t *ent;

        ret = ymalloc((void **)&ent, sizeof(entry_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->op = RDCACHE_OP_SET;
        ent->arg.lba.buf = buf;
        ent->task = schedule_task_get();

        ret = ymalloc((void **)&ent->arg.lba.key, MAX_READCACHE_KEY_LEN);
        if (unlikely(ret))
                GOTO(err_free, ret);

        readcache_trans(chkid, info_version, lba, ent->arg.lba.key);

        ret = __readcache_redis_commit(ent);
        if (unlikely(ret))
                GOTO(err_free1, ret);


        yfree((void **)ent->arg.lba.key);
        yfree((void **)&ent);
        return 0;

err_free1:
        yfree((void **)ent->arg.lba.key);
err_free:
        yfree((void **)&ent);
err_ret:
        return ret;
}
#endif

static int readcache_mset_init(entry_t **_ent)
{
        int ret;
        mctx_t *mctx;
        entry_t *ent;

        ret = ymalloc((void **)_ent, sizeof(entry_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = *_ent;

        ent->op = RDCACHE_OP_MSET;
        ent->task = schedule_task_get();

        mctx = &ent->arg.mctx;

        return multi_exec_init(mctx, REDIS_MSET);
err_ret:
        return ret;
}

static int readcache_mget_init(entry_t **_ent)
{
        int ret;
        mctx_t *mctx;
        entry_t *ent;

        ret = ymalloc((void **)_ent, sizeof(entry_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = *_ent;

        ent->op = RDCACHE_OP_MGET;
        ent->task = schedule_task_get();

        mctx = &ent->arg.mctx;

        return multi_exec_init(mctx, REDIS_MGET);
err_ret:
        return ret;
}

static int readcache_mset_append(entry_t *ent, const chkid_t *chkid, uint64_t info_version, uint64_t lba, void *buf)
{
        int ret;
        char *key;
        mctx_t *mctx;

        YASSERT(lba % READCACHE_PAGE_SIZE == 0);

        mctx = &ent->arg.mctx;

        ret = ymalloc((void **)&key, MAX_READCACHE_KEY_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        readcache_trans(chkid, info_version, lba, key);

        return multi_exec_append(mctx, key, buf, READCACHE_PAGE_SIZE, 0);

err_ret:
        return ret;
}

static int readcache_mget_append(entry_t *ent, const chkid_t *chkid, uint64_t info_version, uint64_t lba)
{
        int ret;
        char *key;
        mctx_t *mctx;

        YASSERT(lba % READCACHE_PAGE_SIZE == 0);

        mctx = &ent->arg.mctx;

        ret = ymalloc((void **)&key, MAX_READCACHE_KEY_LEN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        readcache_trans(chkid, info_version, lba, key);

        return multi_exec_append(mctx, key, NULL, 0, 0);

err_ret:
        return ret;
}

static int readcache_mset_exec(entry_t *ent)
{
        return  __readcache_redis_commit(ent);
}

static int readcache_mget_exec(entry_t *ent)
{
        return  __readcache_redis_commit(ent);
}

static int readcache_mset_destory(entry_t **_ent)
{
        mctx_t *mctx;
        entry_t *ent = *_ent;

        mctx = &ent->arg.mctx;

        multi_exec_destory(mctx);
        yfree((void **)_ent);

        return 0;
}

static int readcache_mget_destory(entry_t **_ent)
{
        mctx_t *mctx;
        entry_t *ent = *_ent;

        mctx = &ent->arg.mctx;

        multi_exec_destory(mctx);
        yfree((void **)_ent);

        return 0;
}

static void __readcache_info_dump__()
{
        int ret;
        double percent;
        char tmp[MAX_BUF_LEN];
        time_t now = gettime();
        if (now - __last_dump__ < READCACHE_INFO_INTERVAL || !readcache_ready())
                return;

        if (hit_ratio.pages)
                percent = (double)hit_ratio.hit / hit_ratio.pages;
        else
                percent = 0.0;

        snprintf(tmp, MAX_BUF_LEN, "redis readcache info:\n"
                        "page count:%ju\n"
                        "hit count:%ju\n"
                        "hit ratio:%3.3f\n",
                        hit_ratio.pages,
                        hit_ratio.hit,
                        percent);

        ret = sy_rwlock_trywrlock(&info_lock);
        if (unlikely(ret))
                return;

        ret = nodectl_set(READCACHE_INFO, tmp);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        __last_dump__ = now;

        sy_rwlock_unlock(&info_lock);
}

static void __readcache_update_hit__(uint64_t pages, uint64_t hit)
{

        if (hit_ratio.pages + pages >= ((uint64_t) - 1)) {
                hit_ratio.pages = 0;
                hit_ratio.hit = 0;
        }

        hit_ratio.pages += pages;
        hit_ratio.hit += hit;
}

#if 0
int readcache_io_split(const io_t *_io, uint64_t info_version, buffer_t *head, buffer_t *tail, io_t *io_new)
{
        int ret;
        char *buf;
        uint64_t pages, hit = 0;
        uint64_t lba, off, offset1;
        int left, step;

        ret = ymalloc((void **)&buf, READCACHE_PAGE_SIZE);
        if (unlikely(ret)) {
                YASSERT(0);
        }

        off = _io->offset;
        left = _io->size;

        pages = left % READCACHE_PAGE_SIZE ? (left / READCACHE_PAGE_SIZE + 1) : (left / READCACHE_PAGE_SIZE);

        while(left > 0) {
                lba = OFF2LBA(off);
                step = _min(left, READCACHE_PAGE_SIZE - off % READCACHE_PAGE_SIZE);

                DBUG("head chkid %s off %ju lba %ju step %u left %u\n", id2str(&_io->id), off, lba, step, left);

                ret = readcache_get(&_io->id, info_version, lba, buf);
                if (unlikely(ret)) {
                        if (ret == EIO) {
                                UNIMPLEMENTED(__DUMP__);
                        } else if (ret == ENOENT) {
                                DBUG("chkid %s head stop, head from %ju -> %ju\n", id2str(&_io->id), _io->offset, off);
                                break;
                        }
                }

                hit += 1;
                mbuffer_copy(head, buf + off - lba, step);

                off += step;
                left -= step;
        };

        offset1 = off;

        off = _io->offset + _io->size;
        while (left > 0) {
                lba = (off % READCACHE_PAGE_SIZE == 0) ? (off - READCACHE_PAGE_SIZE) : OFF2LBA(off);
                step = _min(left, off - lba);

                DBUG("tail chkid %s off %ju lba %ju step %u left %u\n", id2str(&_io->id), off, lba, step, left);

                ret = readcache_get(&_io->id, info_version, lba, buf);
                if (unlikely(ret)) {
                        if (ret == EIO) {
                                UNIMPLEMENTED(__DUMP__);
                        } else if (ret == ENOENT) {
                                DBUG("chkid %s tail stop, head from %ju -> %ju\n", id2str(&_io->id), _io->offset, off);
                                break;
                        }
                }

                hit += 1;
                mbuffer_appendmem_head(tail, buf, step);

                // in reverse order
                off -= step;
                left -= step;
        }

        *io_new = *_io;
        io_new->offset = offset1;
        io_new->size = left;

        __readcache_update_hit(pages, hit);
        __readcache_info_dump__();

        DBUG("io offset %ju size %u -> head %u load <%ju %u> tail %u\n",
                        _io->offset, _io->size,
                        head->len,
                        io_new->offset, io_new->size,
                        tail->len);
        yfree((void **)&buf);
        return 0;
err_free:
        yfree((void **)&buf);
        return ret;
}

#else

int readcache_io_split(const io_t *_io, uint64_t info_version, buffer_t *head, buffer_t *tail, io_t *io_new)
{
        int ret;
        mseg_t *seg;
        entry_t *ent;
        mctx_t *mctx;
        uint64_t pages, hit = 0;
        uint64_t lba, off, offset1;
        int left, step;
        struct list_head *pos, *n;

        if (!readcache_ready()) {
                *io_new = *_io;
                return 0;
        }

        off = _io->offset;
        left = _io->size;

        pages = left % READCACHE_PAGE_SIZE ? (left / READCACHE_PAGE_SIZE + 1) : (left / READCACHE_PAGE_SIZE);

        ANALYSIS_BEGIN(0);
        readcache_mget_init(&ent);

        while(left >0) {
                lba = OFF2LBA(off);
                step = _min(left, READCACHE_PAGE_SIZE - off % READCACHE_PAGE_SIZE);
                readcache_mget_append(ent, &_io->id, info_version, lba);
                off += step;
                left -= step;
        }

        ret = readcache_mget_exec(ent);
        if (unlikely(ret))
                GOTO(err_destory, ret);

        ANALYSIS_END(0, 1000, NULL);

        off = _io->offset;
        left = _io->size;
        mctx = &ent->arg.mctx;

        list_for_each_safe(pos, n, &mctx->kvlist) {

                seg = (mseg_t *)pos;
                lba = OFF2LBA(off);
                step = _min(left, READCACHE_PAGE_SIZE - off % READCACHE_PAGE_SIZE);

                YASSERT(left > 0);

                if (seg->reply->type == REDIS_REPLY_STRING && seg->reply->len == READCACHE_PAGE_SIZE) {

                        hit += 1;
                        mbuffer_copy(head, seg->reply->str + off - lba, step);
                } else {
                        DBUG("chkid %s tail stop, head from %ju -> %ju\n", id2str(&_io->id), _io->offset, off);
                        break;
                }

                off += step;
                left -= step;
        };

        offset1 = off;

        off = _io->offset + _io->size;
        list_for_each_prev_safe(pos, n ,&mctx->kvlist) {

                if (left <= 0)
                        break;

                seg = (mseg_t *)pos;
                lba = (off % READCACHE_PAGE_SIZE == 0) ? (off - READCACHE_PAGE_SIZE) : OFF2LBA(off);
                step = _min(left, off - lba);

                DBUG("tail chkid %s off %ju lba %ju step %u left %u\n", id2str(&_io->id), off, lba, step, left);
                if (seg->reply->type == REDIS_REPLY_STRING && seg->reply->len == READCACHE_PAGE_SIZE) {

                        hit += 1;
                        mbuffer_appendmem_head(tail, seg->reply->str, step);
                } else {
                        DBUG("chkid %s tail stop, head from %ju -> %ju\n", id2str(&_io->id), _io->offset, off);
                        break;
                }

                off -= step;
                left -= step;
        }

        *io_new = *_io;
        io_new->offset = offset1;
        io_new->size = left;

        __readcache_update_hit__(pages, hit);
        __readcache_info_dump__();

        DBUG("io offset %ju size %u -> head %u load <%ju %u> tail %u\n",
                        _io->offset, _io->size,
                        head->len,
                        io_new->offset, io_new->size,
                        tail->len);

        readcache_mget_destory(&ent);
        return 0;

err_destory:
        readcache_mget_destory(&ent);
        return ret;
}

#endif

#if 0
int readcache_save(const io_t *_io, uint64_t info_version, const buffer_t *_buf)
{
        int ret = 0;
        uint64_t off;
        int left, step;
        buffer_t buf;

        mbuffer_init(&buf, 0);
        mbuffer_clone(&buf, (buffer_t *)_buf);

        void *mem = mem_cache_calloc(MEM_CACHE_4K, 0);

        DBUG("offset %ju size %u len %u\n", _io->offset, _io->size, _buf->len);

        YASSERT(_io->size == _buf->len);

        ANALYSIS_BEGIN(0);

        off = _io->offset;
        left = _io->size;

        while (left > 0) {
                step = _min(left, READCACHE_PAGE_SIZE - off % READCACHE_PAGE_SIZE);

                DBUG("readcache io %p off %ju step %u\n", _io, off, step);
                mbuffer_popmsg(&buf, mem, step);

                if (step == READCACHE_PAGE_SIZE) {
                        YASSERT(off % READCACHE_PAGE_SIZE == 0);
                        ret = readcache_set(&_io->id, info_version, off, mem);
                        if (unlikely(ret)) {
                                YASSERT(0);
                                GOTO(err_ret, ret);
                        }
                }

                off += step;
                left -= step;
        }

        ANALYSIS_END(0, 0, NULL);

        YASSERT(buf.len == 0);

        mem_cache_free(MEM_CACHE_4K, mem);
        mbuffer_free(&buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, mem);
        mbuffer_free(&buf);

        return ret;
}
#else
int readcache_save(const io_t *_io, uint64_t info_version, const buffer_t *_buf)
{
        int ret = 0, left, step;
        void *mem;
        uint64_t off;
        buffer_t buf;
        entry_t *ent;
        char stash[READCACHE_PAGE_SIZE];

        mbuffer_init(&buf, 0);
        mbuffer_reference(&buf, (buffer_t *)_buf);

        DBUG("offset %ju size %u len %u\n", _io->offset, _io->size, _buf->len);

        YASSERT(_io->size == _buf->len);

        ANALYSIS_BEGIN(0);

        off = _io->offset;
        left = _io->size;

        readcache_mset_init(&ent);

        while (left > 0) {
                step = _min(left, READCACHE_PAGE_SIZE - off % READCACHE_PAGE_SIZE);

                DBUG("readcache io %p off %ju step %u\n", _io, off, step);

                if (step == READCACHE_PAGE_SIZE) {
                        YASSERT(off % READCACHE_PAGE_SIZE == 0);

                        ret = ymalloc((void **)&mem, step);
                        if (unlikely(ret))
                                GOTO(err_destory, ret);

                        mbuffer_popmsg(&buf, mem, step);
                        ret = readcache_mset_append(ent, &_io->id, info_version, off, mem);
                        if (unlikely(ret)) {
                                YASSERT(0);
                                GOTO(err_destory, ret);
                        }
                } else {
                        /* stash them */
                        mbuffer_popmsg(&buf, stash, step);
                }

                off += step;
                left -= step;
        }

        readcache_mset_exec(ent);
        ANALYSIS_END(0, 0, NULL);

        YASSERT(buf.len == 0);

        readcache_mset_destory(&ent);
        mbuffer_free(&buf);

        return 0;
err_destory:
        readcache_mset_destory(&ent);
        mbuffer_free(&buf);

        return ret;
}
#endif


#ifdef READCACHE_CHECK
int readcache_check(const io_t *_io, uint64_t info_version, const buffer_t *_buf)
{
        int ret;

        buffer_t buf;
        off_t boundary, aligndown, offset = _io->offset;
        size_t size = _io->size;

        mbuffer_init(&buf, 0);
        mbuffer_reference(&buf, (buffer_t *)_buf);

        void *cache = mem_cache_calloc(MEM_CACHE_4K, 0);
        void *src = mem_cache_calloc(MEM_CACHE_4K, 0);

        aligndown = offset - round_down(offset, READCACHE_PAGE_SIZE) ;
        mbuffer_popmsg(&buf, src, aligndown);
        for (boundary = round_up(offset, READCACHE_PAGE_SIZE); \
                        boundary < round_down( offset + size, READCACHE_PAGE_SIZE); \
                        boundary += READCACHE_PAGE_SIZE)
        {
                mbuffer_popmsg(&buf, src, READCACHE_PAGE_SIZE);
                ret = readcache_get(&_io->id, info_version, boundary, cache);
                if (unlikely(ret)) {
                        YASSERT(0); // for test
                        GOTO(err_ret, ret);
                }
                if (memcmp(src, cache, READCACHE_PAGE_SIZE))
                        YASSERT(0);
        }

        YASSERT(buf.len < READCACHE_PAGE_SIZE);

        DBUG("read cache check succ\n");
        mbuffer_free(&buf);
        mem_cache_free(MEM_CACHE_4K, cache);
        mem_cache_free(MEM_CACHE_4K, src);
        return 0;
err_ret:
        mbuffer_free(&buf);
        mem_cache_free(MEM_CACHE_4K, cache);
        mem_cache_free(MEM_CACHE_4K, src);
        YASSERT(0);
        return ret;
}
#endif


